package com.pengpeng.rocketmq.consumer;

import com.pengpeng.rocketmq.config.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;


/**
 * @author 彭鹏
 * @date 2021/10/25.
 */
@Slf4j
@Component
public class BroadcastConsumer2 {
    /**
     * 消费者实体对象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic（ 如果没有创建好，生产者往该主题发送消息 会报找不到topic错误）
     */
    public static final String TOPIC = "TOPIC_BROADCAST_CONSUMER";
    /**
     * 消费者组
     */
    public static final String CONSUMER_GROUP = "broadcast_group_name2";

    /**
     * 通过构造函数 实例化对象
     */
    public BroadcastConsumer2() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe(TOPIC, "Broadcast1 || Broadcast2 || Broadcast3");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (Message msg : msgs) {

                        //消费者获取消息 这里只输出 不做后面逻辑处理
                        String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        log.info("BroadcastConsumer2-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("BroadcastConsumer2消费者 启动成功=======");
    }
}
